package xiaoa.java.jms.rabbitMq;

import java.io.IOException;


import java.util.AbstractMap;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;


import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

import xiaoa.java.jms.rabbitMq.LinkQueueMgr.LinkNode;
import xiaoa.java.log.L;
import xiaoa.java.utils.ExceptionUtils;


/**
 * 消费者
 * @author xiaoa
 * @date 2017年10月21日 上午10:41:18
 * @version V1.0
 *
 */
public class ConsumerQueue  extends QueueBase{

	
	// 消费者map
    private Map<String,Map.Entry<Listen ,RunThread >> consumerMap = new HashMap<>();
	
	public ConsumerQueue(String exchange,String path ,String queueName) throws Throwable {
		super(exchange ,path ,queueName);
	}
	
	
	/**
	 * 
	 * 构造器
	 * <p>Title: </p>
	 * <p>Description: </p>
	 * @author xiaoa
	 * @param zkKey
	 * @throws Throwable
	 */
	public ConsumerQueue(String zkKey) throws Throwable {
		if (zkKey == null){
			throw new NullPointerException("zkKey");
		}
		
		LinkNode  node = LinkQueueMgr.getNode(zkKey);
		
		if (node == null){
			throw new NullPointerException("node");
		}
		
		super.init(node.getExchange() ,node.getRoutingKey() ,node.getQueueName());

		
	}
	
	
	
	
	
	
	
	/**
	 * 是否包含该key
	 * @Title: containKey
	 * @param key
	 * @return
	 * @author xiaoa
	 */
	private boolean containKey(String key){
		synchronized (consumerMap){
			return consumerMap.containsKey(key);
		}
	}
	
	/**
	 * 添加监听
	 * @Title: addListen
	 * @param key
	 * @param listen
	 * @throws Throwable
	 * @author xiaoa
	 */
	public void addListen(final String key ,final Listen listen)throws Throwable{
		
		if (key == null || key.isEmpty()){
			throw new RuntimeException("key 参数有误");
		}
		
		if (listen == null){
			throw new RuntimeException("listen 参数有误");
		}
		
		if (containKey(key)){
			throw new RuntimeException("key Already exist ");
		}
		
		// 指定消费队列  
		final QueueingConsumer consumer = new QueueingConsumer(channel);  
	     
		 // 打开应答机制  
		 channel.basicConsume(queueName, false, consumer);  
		 
		 // 当前队列对象
		 final QueueBase  thisQueue = this;
		 
		 RunThread  run  = new RunThread() {
			 @Override
			public void runWord() throws Throwable {
				 while (!exit()){

						
					 Delivery  delivery =  consumer.nextDelivery();
					 
					 long startTime = System.currentTimeMillis();
					 LinkedHashMap<String, String> logMap = new LinkedHashMap<>();
					 
					 // 消息key
					 logMap.put("tag", delivery.getEnvelope().getDeliveryTag() + "");
					 logMap.put("queueKey", key);
					 logMap.put("start", "start");
					 L.info(logMap);
					 
					 try {
						 
						 // 运行业务
						Object nextValue =  listen.doWord(delivery, logMap);
						
						if (nextValue != null){
							 // 发送到下个节点
							 LinkQueueMgr.doPushNext(delivery.getProperties().getHeaders(), nextValue, thisQueue);
						}
						 
						 // 消息确认成功
						 messageAck(delivery, logMap);
						 
					     logMap.put("start", "succ");
						 
					} catch (Throwable e) {
						logMap.put("start", "error");
						logMap.put("errorMsg", ExceptionUtils.exceptionToString(e));
						try {
							messageAckError(delivery, logMap);
						} catch (IOException e1) {
							logMap.put("errorMsg", ExceptionUtils.exceptionToString(e1));
						}
						
						doSendExceMssage(delivery);
						
				   }finally {
						L.info(startTime, System.currentTimeMillis(), logMap);

				   }
				 }
			}
		 };
		 
		// 放入消息map
	    consumerMap.put(key, new AbstractMap.SimpleEntry<>(listen, run));
		 
		Thread  thread =  new Thread(run);
		thread.setName("Consumer_" + key);
		
		thread.start();
		
	}
	
	/**
	 * 消息正确处理时确认消息
	 * @Title: messageAck
	 * @author xiaoa
	 * @throws IOException 
	 */
	void messageAck(QueueingConsumer.Delivery delivery  , Map<String, String>  logMap ) throws IOException{
		channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		
	}
	
	/**
	 * 消息没有被正确处理时确认消息
	 * @Title: messageAckError
	 * @author xiaoa
	 * @throws IOException 
	 */
	void messageAckError(QueueingConsumer.Delivery delivery  , Map<String, String>  logMap ) throws IOException{
		channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
	}


	/**
	 * 删除监听
	 * @Title: removeListen
	 * @param key
	 * @author xiaoa
	 */
	public void removeListen(String key){
		
		synchronized (consumerMap) {
			
			Map.Entry<Listen ,RunThread >  run =  consumerMap.get(key);
			
			if (run == null){
				return ;
			}
			
			// 退出线程
			run.getValue().exitRun();
			
			// 删除
			consumerMap.remove(key);
		}
		
	}
	
	
	/**
	 * 监听
	 * @author xiaoa
	 * @date 2017年10月21日 上午10:42:42
	 * @version V1.0
	 *
	 */
	public  static  interface  Listen{
		
		/**
		 * 工作
		 * @Title: doWord
		 * @param delivery
		 * @param logMap
		 * @throws Throwable
		 * @author xiaoa
		 */
		Object doWord(QueueingConsumer.Delivery delivery  , Map<String, String>  logMap )throws Throwable;
		
	}
	
	
	/**
	 * 消费线程
	 * @author xiaoa
	 * @date 2017年10月21日 上午11:25:38
	 * @version V1.0
	 *
	 */
	private abstract class RunThread implements Runnable{
		
		/**
		 * 是否退出
		 */
		AtomicBoolean  exit = new AtomicBoolean(false);
		 
		/**
		 * 退出运行
		 * @Title: exitRun
		 * @author xiaoa
		 */
		 public void exitRun(){
			 exit.set(true);
		 }
		
		 /**
		  * 是否退出
		  * @Title: exit
		  * @return
		  * @author xiaoa
		  */
		 public boolean exit(){
			 return exit.get();
		 }
		
		 /**
		  * 工作
		  * @Title: runWord
		  * @throws Throwable
		  * @author xiaoa
		  */
		abstract void runWord()throws Throwable;
		
		@Override
		public void run() {
			try {
				
				runWord();
			} catch (Throwable e) {
				e.printStackTrace();
				L.info("RunThreadName = " + Thread.currentThread().getName() + " : " + ExceptionUtils.exceptionToString(e) +  " exit!");
			}
			
		}
		
	}
	
}
